/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.extensions.euphoria.core.client.operator;



import com.bff.gaia.unified.sdk.annotations.Internal;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.annotation.audience.Audience;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.annotation.operator.Recommended;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.annotation.operator.StateComplexity;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.functional.*;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.functional.BinaryFunction;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.functional.CombinableReduceFunction;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.io.Collector;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.operator.base.Builders;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.operator.base.OptionalMethodBuilder;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.operator.base.ShuffleOperator;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.type.TypeAware;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.type.TypeAwareness;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.translate.OperatorTransform;

import com.bff.gaia.unified.sdk.transforms.windowing.*;

import com.bff.gaia.unified.sdk.values.*;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.functional.ReduceFunction;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.functional.ReduceFunctor;

import com.bff.gaia.unified.sdk.extensions.euphoria.core.client.functional.UnaryFunction;

import com.bff.gaia.unified.sdk.transforms.windowing.BoundedWindow;

import com.bff.gaia.unified.sdk.transforms.windowing.TimestampCombiner;

import com.bff.gaia.unified.sdk.transforms.windowing.Trigger;

import com.bff.gaia.unified.sdk.transforms.windowing.Window;

import com.bff.gaia.unified.sdk.transforms.windowing.WindowFn;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.sdk.values.PCollectionList;

import com.bff.gaia.unified.sdk.values.TypeDescriptor;

import com.bff.gaia.unified.sdk.values.TypeDescriptors;

import com.bff.gaia.unified.sdk.values.WindowingStrategy;

import org.joda.time.Duration;



import javax.annotation.Nullable;

import java.util.Optional;

import java.util.stream.Stream;



import static java.util.Objects.requireNonNull;



/**

 * Operator performing state-less aggregation by given reduce function. The reduction is performed

 * on all extracted values on each key-window.

 *

 * <p>If provided function is {@link CombinableReduceFunction} partial reduction is performed before

 * shuffle. If the function is not combinable all values must be first sent through the network and

 * the reduction is done afterwards on target machines.

 *

 * <p>Custom windowing can be set, otherwise values from input operator are used.

 *

 * <h3>Builders:</h3>

 *

 * <ol>

 *   <li>{@code [named] ..................} give name to the operator [optional]

 *   <li>{@code of .......................} input dataset

 *   <li>{@code keyBy ....................} key extractor function

 *   <li>{@code [valueBy] ................} value extractor function (default: identity)

 *   <li>{@code (combineBy | reduceBy)....} {@link CombinableReduceFunction} or {@link

 *       ReduceFunction} for combinable or non-combinable function

 *   <li>{@code [withSortedValues] .......} use comparator for sorting values prior to being passed

 *       to {@link ReduceFunction} function (applicable only for non-combinable version)

 *   <li>{@code [windowBy] ...............} windowing (see {@link WindowFn}), default is no

 *       windowing

 *   <li>{@code [triggeredBy] ............} defines windowing trigger, follows [windowBy] if called

 *   <li>{@code [accumulationMode] .......} windowing accumulation mode, follows [triggeredBy]

 *   <li>{@code (output | outputValues) ..} build output dataset

 * </ol>

 *

 * @param <InputT> Type of input records

 * @param <KeyT> Output type of #keyBy method

 * @param <ValueT> Output type of #valueBy method

 * @param <OutputT> Type of output value

 */

@Audience(Audience.Type.CLIENT)

@Recommended(

    reason =

        "Is very recommended to override because of performance in "

            + "a specific area of (mostly) batch calculations where combiners "

            + "can be efficiently used in the executor-specific implementation",

    state = StateComplexity.CONSTANT_IF_COMBINABLE,

    repartitions = 1)

public class ReduceByKey<InputT, KeyT, ValueT, OutputT>

    extends ShuffleOperator<InputT, KeyT, KV<KeyT, OutputT>> implements TypeAware.Value<ValueT> {



  /**

   * Starts building a nameless {@link ReduceByKey} operator to process the given input dataset.

   *

   * @param <InputT> the type of elements of the input dataset

   * @param input the input data set to be processed

   * @return a builder to complete the setup of the new operator

   * @see #named(String)

   * @see OfBuilder#of(PCollection)

   */

  public static <InputT> KeyByBuilder<InputT> of(PCollection<InputT> input) {

    return named(null).of(input);

  }



  /**

   * Starts building a named {@link ReduceByKey} operator.

   *

   * @param name a user provided name of the new operator to build

   * @return a builder to complete the setup of the new operator

   */

  public static OfBuilder named(@Nullable String name) {

    return new Builder(name);

  }



  /** Builder for 'of' step. */

  public interface OfBuilder extends Builders.Of {



    @Override

    <InputT> KeyByBuilder<InputT> of(PCollection<InputT> input);

  }



  /** Builder for 'keyBy' step. */

  public interface KeyByBuilder<InputT> extends Builders.KeyBy<InputT> {



    @Override

    <T> ValueByReduceByBuilder<InputT, T, InputT> keyBy(

		UnaryFunction<InputT, T> keyExtractor, TypeDescriptor<T> keyType);



    @Override

    default <T> ValueByReduceByBuilder<InputT, T, InputT> keyBy(

		UnaryFunction<InputT, T> keyExtractor) {

      return keyBy(keyExtractor, null);

    }

  }



  /** Builder for 'reduceBy' step. */

  public interface ReduceByBuilder<KeyT, ValueT> {



    /**

     * Define a function that reduces all values related to one key into one result object. The

     * function is not combinable - i.e. partial results cannot be made up before shuffle. To get

     * better performance use {@link #combineBy} method.

     *

     * @param <OutputT> type of output element

     * @param reducer function that reduces all values into one output object

     * @return next builder to complete the setup of the {@link ReduceByKey} operator

     */

    default <OutputT> WithSortedValuesBuilder<KeyT, ValueT, OutputT> reduceBy(

		ReduceFunction<ValueT, OutputT> reducer) {

      return reduceBy(

          (Stream<ValueT> in, Collector<OutputT> ctx) -> ctx.collect(reducer.apply(in)));

    }



    default <OutputT> WithSortedValuesBuilder<KeyT, ValueT, OutputT> reduceBy(

		ReduceFunction<ValueT, OutputT> reducer, TypeDescriptor<OutputT> outputType) {

      return reduceBy(

          (Stream<ValueT> in, Collector<OutputT> ctx) -> ctx.collect(reducer.apply(in)),

          outputType);

    }



    /**

     * Define a function that reduces all values related to one key into one or more result objects.

     * The function is not combinable - i.e. partial results cannot be made up before shuffle. To

     * get better performance use {@link #combineBy} method.

     *

     * @param <OutputT> type of output element

     * @param reducer function that reduces all values into output values

     * @return next builder to complete the setup of the {@link ReduceByKey} operator

     */

    default <OutputT> WithSortedValuesBuilder<KeyT, ValueT, OutputT> reduceBy(

		ReduceFunctor<ValueT, OutputT> reducer) {

      return reduceBy(reducer, null);

    }



    <OutputT> WithSortedValuesBuilder<KeyT, ValueT, OutputT> reduceBy(

		ReduceFunctor<ValueT, OutputT> reducer, @Nullable TypeDescriptor<OutputT> outputType);



    /**

     * Define a function that reduces all values related to one key into one result object. The

     * function is combinable (associative and commutative) so it can be used to compute partial

     * results before shuffle.

     *

     * @param reducer function that reduces all values into one output object

     * @return next builder to complete the setup of the {@link ReduceByKey} operator

     */

    default WindowByBuilder<KeyT, ValueT> combineBy(CombinableReduceFunction<ValueT> reducer) {

      return reduceBy(ReduceFunctor.of(reducer));

    }



    default WindowByBuilder<KeyT, ValueT> combineBy(

		CombinableReduceFunction<ValueT> reducer, TypeDescriptor<ValueT> outputType) {

      return reduceBy(ReduceFunctor.of(reducer), outputType);

    }

  }



  /** Builder for 'valueBy' / 'reduceBy' step. */

  public interface ValueByReduceByBuilder<InputT, KeyT, ValueT>

      extends ReduceByBuilder<KeyT, ValueT> {



    /**

     * Specifies the function to derive a value from the {@link ReduceByKey} operator's input

     * elements to get reduced by a later supplied reduce function.

     *

     * @param <T> the type of the extracted values

     * @param valueExtractor a user defined function to extract values from the processed input

     *     dataset's elements for later reduction

     * @param valueType {@link TypeDescriptor} of value type {@code <V>}

     * @return the next builder to complete the setup of the {@link ReduceByKey} operator

     */

    <T> ReduceByBuilder<KeyT, T> valueBy(

		UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType);



    default <T> ReduceByBuilder<KeyT, T> valueBy(UnaryFunction<InputT, T> valueExtractor) {

      return valueBy(valueExtractor, null);

    }

  }



  /** Builder for 'withSortedValues' step. */

  public interface WithSortedValuesBuilder<KeyT, ValueT, OutputT>

      extends WindowByBuilder<KeyT, OutputT> {



    /**

     * Sort values going to `reduceBy` function by given comparator.

     *

     * @param comparator function with contract defined by {@code java.util.Comparator#compare}.

     * @return next step builder

     */

    WindowByBuilder<KeyT, OutputT> withSortedValues(

		BinaryFunction<ValueT, ValueT, Integer> comparator);

  }



  /** Internal builder for 'windowBy' step. */

  @Internal

  public interface WindowByInternalBuilder<InputT, KeyT, OutputT> {



    /**

     * For internal use only. Set already constructed {@link Window}. This allows easier

     * construction of composite operators.

     *

     * @param window unified window

     * @return output builder

     */

    OutputBuilder<KeyT, OutputT> windowBy(Window<InputT> window);

  }



  /** Builder for 'windowBy' step. */

  public interface WindowByBuilder<KeyT, OutputT>

      extends Builders.WindowBy<TriggeredByBuilder<KeyT, OutputT>>,

	  OptionalMethodBuilder<WindowByBuilder<KeyT, OutputT>, OutputBuilder<KeyT, OutputT>>,

          OutputBuilder<KeyT, OutputT> {



    @Override

    <W extends BoundedWindow> TriggeredByBuilder<KeyT, OutputT> windowBy(

		WindowFn<Object, W> windowing);



    @Override

    default OutputBuilder<KeyT, OutputT> applyIf(

		boolean cond,

		UnaryFunction<WindowByBuilder<KeyT, OutputT>, OutputBuilder<KeyT, OutputT>> fn) {

      requireNonNull(fn);

      return cond ? fn.apply(this) : this;

    }

  }



  /** Builder for 'triggeredBy' step. */

  public interface TriggeredByBuilder<KeyT, OutputT>

      extends Builders.TriggeredBy<AccumulationModeBuilder<KeyT, OutputT>> {



    @Override

    AccumulationModeBuilder<KeyT, OutputT> triggeredBy(Trigger trigger);

  }



  /** Builder for 'accumulationMode' step. */

  public interface AccumulationModeBuilder<KeyT, OutputT>

      extends Builders.AccumulationMode<WindowedOutputBuilder<KeyT, OutputT>> {



    @Override

    WindowedOutputBuilder<KeyT, OutputT> accumulationMode(

		WindowingStrategy.AccumulationMode accumulationMode);

  }



  /** Builder for 'windowed output' step. */

  public interface WindowedOutputBuilder<KeyT, OutputT>

      extends Builders.WindowedOutput<WindowedOutputBuilder<KeyT, OutputT>>,

          OutputBuilder<KeyT, OutputT> {}



  /** Builder for 'output' step. */

  public interface OutputBuilder<KeyT, OutputT>

      extends Builders.Output<KV<KeyT, OutputT>>, Builders.OutputValues<KeyT, OutputT> {}



  /**

   * Builder for ReduceByKey operator.

   *

   * @param <InputT> type of input

   * @param <KeyT> type of key

   * @param <ValueT> type of value

   * @param <OutputT> type ouf output

   */

  static class Builder<InputT, KeyT, ValueT, OutputT>

      implements OfBuilder,

          KeyByBuilder<InputT>,

          ValueByReduceByBuilder<InputT, KeyT, ValueT>,

          WithSortedValuesBuilder<KeyT, ValueT, OutputT>,

          WindowByInternalBuilder<InputT, KeyT, OutputT>,

          WindowByBuilder<KeyT, OutputT>,

          TriggeredByBuilder<KeyT, OutputT>,

          AccumulationModeBuilder<KeyT, OutputT>,

          WindowedOutputBuilder<KeyT, OutputT>,

          OutputBuilder<KeyT, OutputT> {



    private final WindowBuilder<InputT> windowBuilder = new WindowBuilder<>();



    @Nullable private final String name;

    private PCollection<InputT> input;

    private UnaryFunction<InputT, KeyT> keyExtractor;

    @Nullable private TypeDescriptor<KeyT> keyType;

    @Nullable private UnaryFunction<InputT, ValueT> valueExtractor;

    @Nullable private TypeDescriptor<ValueT> valueType;

    private ReduceFunctor<ValueT, OutputT> reducer;

    @Nullable private TypeDescriptor<OutputT> outputType;

    @Nullable private BinaryFunction<ValueT, ValueT, Integer> valueComparator;



    Builder(@Nullable String name) {

      this.name = name;

    }



    @Override

    @SuppressWarnings("unchecked")

    public <T> KeyByBuilder<T> of(PCollection<T> input) {

      @SuppressWarnings("unchecked")

      final Builder<T, ?, ?, ?> cast = (Builder) this;

      cast.input = input;

      return cast;

    }



    @Override

    public <T> ValueByReduceByBuilder<InputT, T, InputT> keyBy(

		UnaryFunction<InputT, T> keyExtractor, @Nullable TypeDescriptor<T> keyType) {

      @SuppressWarnings("unchecked")

      final Builder<InputT, T, InputT, ?> cast = (Builder) this;

      cast.keyExtractor = requireNonNull(keyExtractor);

      cast.keyType = keyType;

      return cast;

    }



    @Override

    public <T> ReduceByBuilder<KeyT, T> valueBy(

		UnaryFunction<InputT, T> valueExtractor, @Nullable TypeDescriptor<T> valueType) {

      @SuppressWarnings("unchecked")

      final Builder<InputT, KeyT, T, ?> cast = (Builder) this;

      cast.valueExtractor = requireNonNull(valueExtractor);

      cast.valueType = valueType;

      return cast;

    }



    @Override

    @SuppressWarnings("unchecked")

    public <T> WithSortedValuesBuilder<KeyT, ValueT, T> reduceBy(

		ReduceFunctor<ValueT, T> reducer, @Nullable TypeDescriptor<T> outputType) {

      if (valueExtractor == null) {

        // if the valueExtractor was not set in 'valueBy' step, we use untouched input element

        valueExtractor = (UnaryFunction) UnaryFunction.identity();

      }

      @SuppressWarnings("unchecked")

      final Builder<InputT, KeyT, ValueT, T> cast = (Builder) this;

      cast.reducer = requireNonNull(reducer);

      cast.outputType = outputType;

      return cast;

    }



    @Override

    public WindowByBuilder<KeyT, OutputT> withSortedValues(

        BinaryFunction<ValueT, ValueT, Integer> valueComparator) {

      this.valueComparator = requireNonNull(valueComparator);

      return this;

    }



    @Override

    public OutputBuilder<KeyT, OutputT> windowBy(Window<InputT> window) {

      windowBuilder.setWindow(window);

      return this;

    }



    @Override

    public <W extends BoundedWindow> TriggeredByBuilder<KeyT, OutputT> windowBy(

        WindowFn<Object, W> windowFn) {

      windowBuilder.windowBy(windowFn);

      return this;

    }



    @Override

    public AccumulationModeBuilder<KeyT, OutputT> triggeredBy(Trigger trigger) {

      windowBuilder.triggeredBy(trigger);

      return this;

    }



    @Override

    public WindowedOutputBuilder<KeyT, OutputT> accumulationMode(

        WindowingStrategy.AccumulationMode accumulationMode) {

      windowBuilder.accumulationMode(accumulationMode);

      return this;

    }



    @Override

    public WindowedOutputBuilder<KeyT, OutputT> withAllowedLateness(Duration allowedLateness) {

      windowBuilder.withAllowedLateness(allowedLateness);

      return this;

    }



    @Override

    public WindowedOutputBuilder<KeyT, OutputT> withAllowedLateness(

        Duration allowedLateness, Window.ClosingBehavior closingBehavior) {

      windowBuilder.withAllowedLateness(allowedLateness, closingBehavior);

      return this;

    }



    @Override

    public WindowedOutputBuilder<KeyT, OutputT> withTimestampCombiner(

        TimestampCombiner timestampCombiner) {

      windowBuilder.withTimestampCombiner(timestampCombiner);

      return this;

    }



    @Override

    public WindowedOutputBuilder<KeyT, OutputT> withOnTimeBehavior(Window.OnTimeBehavior behavior) {

      windowBuilder.withOnTimeBehavior(behavior);

      return this;

    }



    @Override

    public PCollection<KV<KeyT, OutputT>> output(OutputHint... outputHints) {

      return OperatorTransform.apply(createOperator(), PCollectionList.of(input));

    }



    @Override

    public PCollection<OutputT> outputValues(OutputHint... outputHints) {

      return OperatorTransform.apply(

          new OutputValues<>(name, outputType, createOperator()), PCollectionList.of(input));

    }



    private ReduceByKey<InputT, KeyT, ValueT, OutputT> createOperator() {

      return new ReduceByKey<>(

          name,

          keyExtractor,

          keyType,

          valueExtractor,

          valueType,

          reducer,

          valueComparator,

          windowBuilder.getWindow().orElse(null),

          TypeDescriptors.kvs(

              TypeAwareness.orObjects(Optional.ofNullable(keyType)),

              TypeAwareness.orObjects(Optional.ofNullable(outputType))));

    }

  }



  private final ReduceFunctor<ValueT, OutputT> reducer;

  private final UnaryFunction<InputT, ValueT> valueExtractor;

  @Nullable private final BinaryFunction<ValueT, ValueT, Integer> valueComparator;

  @Nullable private final TypeDescriptor<ValueT> valueType;



  private ReduceByKey(

      @Nullable String name,

      UnaryFunction<InputT, KeyT> keyExtractor,

      @Nullable TypeDescriptor<KeyT> keyType,

      UnaryFunction<InputT, ValueT> valueExtractor,

      @Nullable TypeDescriptor<ValueT> valueType,

      ReduceFunctor<ValueT, OutputT> reducer,

      @Nullable BinaryFunction<ValueT, ValueT, Integer> valueComparator,

      @Nullable Window<InputT> window,

      TypeDescriptor<KV<KeyT, OutputT>> outputType) {

    super(name, outputType, keyExtractor, keyType, window);

    this.reducer = reducer;

    this.valueExtractor = valueExtractor;

    this.valueType = valueType;

    this.valueComparator = valueComparator;

  }



  public ReduceFunctor<ValueT, OutputT> getReducer() {

    return reducer;

  }



  public boolean isCombinable() {

    return reducer.isCombinable();

  }



  public UnaryFunction<InputT, ValueT> getValueExtractor() {

    return valueExtractor;

  }



  public Optional<BinaryFunction<ValueT, ValueT, Integer>> getValueComparator() {

    return Optional.ofNullable(valueComparator);

  }



  @Override

  public Optional<TypeDescriptor<ValueT>> getValueType() {

    return Optional.ofNullable(valueType);

  }

}